bitkeeper revision 1.1052 (40ed38a6k6vT2ycbqz1BZA6U9KRVgg)
authormjw@wray-m-3.hpl.hp.com <mjw@wray-m-3.hpl.hp.com>
Thu, 8 Jul 2004 12:05:58 +0000 (12:05 +0000)
committermjw@wray-m-3.hpl.hp.com <mjw@wray-m-3.hpl.hp.com>
Thu, 8 Jul 2004 12:05:58 +0000 (12:05 +0000)
Start of save support in xfrd.

.rootkeys
tools/python/xen/xend/XendMigrate.py
tools/python/xen/xend/packing.py [deleted file]
tools/xfrd/xfrdClient.py

index 66c3c5ef7625cc99599d0c1a4eabe78fc500ac22..f953e63aea719105dfbc695606fe1e94a8627431 100644 (file)
--- a/.rootkeys
+++ b/.rootkeys
 40c9c468xzANp6o2D_MeCYwNmOIUsQ tools/python/xen/xend/XendVnet.py
 40c9c468x191zetrVlMnExfsQWHxIQ tools/python/xen/xend/__init__.py
 40c9c468S2YnCEKmk4ey8XQIST7INg tools/python/xen/xend/encode.py
-40e9808elkoRulOo1GxRTp5ulJGVNw tools/python/xen/xend/packing.py
 40c9c468DCpMe542varOolW1Xc68ew tools/python/xen/xend/server/SrvBase.py
 40c9c468IxQabrKJSWs0aEjl-27mRQ tools/python/xen/xend/server/SrvConsole.py
 40c9c4689Io5bxfbYIfRiUvsiLX0EQ tools/python/xen/xend/server/SrvConsoleDir.py
index a4a4183cebe8e6123ecf9d989632f8d5defa5bb7..4d19a772ac56b9a29647b0fe27e764715d37e9ca 100644 (file)
@@ -19,79 +19,41 @@ XFRD_PORT = 8002
 XFR_PROTO_MAJOR = 1
 XFR_PROTO_MINOR = 0
 
-class Migrate(Protocol):
+class Xfrd(Protocol):
+    """Protocol handler for a connection to the migration/save daemon xfrd.
+    """
 
-    def __init__(self, minfo):
+    def __init__(self, xinfo):
         self.parser = sxp.Parser()
-        self.minfo = minfo
+        self.xinfo = xinfo
 
     def connectionMade(self):
         # Send hello.
         self.request(['xfr.hello', XFR_PROTO_MAJOR, XFR_PROTO_MINOR])
-        # Send migrate.
-        vmconfig = self.minfo.vmconfig()
-        if not vmconfig:
-            self.loseConnection()
-            return
-        self.request(['xfr.migrate',
-                      self.minfo.src_dom,
-                      vmconfig,
-                      self.minfo.dst_host,
-                      self.minfo.dst_port])
+        # Send request.
+        self.xinfo.request(self)
 
     def request(self, req):
         sxp.show(req, out=self.transport)
-        self.transport.write(' \n')
 
     def loseConnection(self):
         self.transport.loseConnection()
 
     def connectionLost(self, reason):
-        self.minfo.closed(reason)
-
-    def dispatch(self, val):
-        op = sxp.name(val)
-        op = op.replace('.', '_')
-        if op.startswith('xfr_'):
-            fn = getattr(self, op, self.unknown)
-        else:
-            fn = self.unknown()
-        fn(val)
+        self.xinfo.connectionLost(reason)
 
     def dataReceived(self, data):
         self.parser.input(data)
         if self.parser.ready():
             val = self.parser.get_val()
-            self.dispatch(val)
+            self.xinfo.dispatch(val)
         if self.parser.at_eof():
             self.loseConnection()
             
-    def unknown(self, val):
-        print 'unknown>', val
 
-    def xfr_progress(self, val):
-        print 'xfr_progress>', val
-
-    def xfr_err(self, val):
-        # If we get an error with non-zero code the migrate failed.
-        # An error with code zero indicates hello success.
-        print 'xfr_err>', val
-        v = sxp.child(val)
-        print 'xfr_err>', type(v), v
-        err = int(sxp.child(val))
-        if not err: return
-        self.minfo.error(err);
-        self.loseConnection()
-
-    def xfr_ok(self, val):
-        # An ok indicates migrate completed successfully, and contains
-        # the new domain id on the remote system.
-        print 'xfr_ok>', val
-        dom = int(sxp.child(val))
-        self.minfo.ok(dom)
-        self.loseConnection()
-
-class MigrateClientFactory(ClientFactory):
+class XfrdClientFactory(ClientFactory):
+    """Factory for clients of the migration/save daemon xfrd.
+    """
 
     def __init__(self, minfo):
         #ClientFactory.__init__(self)
@@ -110,10 +72,63 @@ class MigrateClientFactory(ClientFactory):
     def clientConnectionFailed(self, connector, reason):
         print 'clientConnectionFailed>', 'connector=', connector, 'reason=', reason
 
+class XfrdInfo:
+    """Abstract class for info about a session with xfrd.
+    Has subclasses for save and migrate.
+    """
+    
+    def vmconfig(self):
+        print 'vmconfig>'
+        from xen.xend import XendDomain
+        xd = XendDomain.instance()
+
+        dominfo = xd.domain_get(self.src_dom)
+        print 'vmconfig>', type(dominfo), dominfo
+        if dominfo:
+            val = sxp.to_string(dominfo.sxpr())
+        else:
+            val = None
+        print 'vmconfig<', 'val=', type(val), val
+        return val
 
-class XendMigrateInfo:
+    def error(self, err):
+        self.state = 'error'
 
-    # states: begin, active, failed, succeeded?
+    def dispatch(self, xfrd, val):
+        op = sxp.name(val)
+        op = op.replace('.', '_')
+        if op.startswith('xfr_'):
+            fn = getattr(self, op, self.unknown)
+        else:
+            fn = self.unknown()
+        fn(xfrd, val)
+
+    def unknown(self, xfrd, val):
+        print 'unknown>', val
+
+    def xfr_err(self, xfrd, val):
+        # If we get an error with non-zero code the migrate failed.
+        # An error with code zero indicates hello success.
+        print 'xfr_err>', val
+        v = sxp.child(val)
+        print 'xfr_err>', type(v), v
+        err = int(sxp.child(val))
+        if not err: return
+        self.error(err);
+        xfrd.loseConnection()
+
+    def xfr_progress(self, val):
+        print 'xfr_progress>', val
+
+    def xfr_domain_pause(self, val):
+        print 'xfr__domain_pause>', val
+
+    def xfr_domain_suspend(self, val):
+        print 'xfr_domain_suspend>', val
+
+class XendMigrateInfo(XfrdInfo):
+    """Representation of a migrate in-progress and its interaction with xfrd.
+    """
 
     def __init__(self, id, dom, host, port):
         self.id = id
@@ -126,12 +141,6 @@ class XendMigrateInfo:
         self.start = 0
         self.deferred = defer.Deferred()
         
-    def set_state(self, state):
-        self.state = state
-
-    def get_state(self):
-        return self.state
-
     def sxpr(self):
         sxpr = ['migrate', ['id', self.id], ['state', self.state] ]
         sxpr_src = ['src', ['host', self.src_host], ['domain', self.src_dom] ]
@@ -142,35 +151,72 @@ class XendMigrateInfo:
         sxpr.append(sxpr_dst)
         return sxpr
 
-    def vmconfig(self):
-        print 'vmconfig>'
-        from xen.xend import XendDomain
-        xd = XendDomain.instance()
+    def request(self, xfrd):
+        vmconfig = self.vmconfig()
+        if not vmconfig:
+            xfrd.loseConnection()
+            return
+        xfrd.request(['xfr.migrate',
+                      self.src_dom,
+                      vmconfig,
+                      self.dst_host,
+                      self.d.dst_port])
+        
+    def xfr_migrate_ok(self, val):
+        dom = int(sxp.child0(val))
+        self.state = 'ok'
+        self.dst_dom = dom
 
-        dominfo = xd.domain_get(self.src_dom)
-        print 'vmconfig>', type(dominfo), dominfo
-        if dominfo:
-            val = sxp.to_string(dominfo.sxpr())
+    def connectionLost(self, reason=None):
+        if self.state =='ok':
+            eserver.inject('xend.migrate.ok', self.sxpr())
         else:
-            val = None
-        print 'vmconfig<', 'val=', type(val), val
-        return val
+            self.state = 'error'
+            eserver.inject('xend.migrate.error', self.sxpr())
 
-    def error(self, err):
-        self.state = 'error'
+class XendSaveInfo(XfrdInfo):
+    """Representation of a save in-progress and its interaction with xfrd.
+    """
+    
+    def __init__(self, id, dom, file):
+        self.id = id
+        self.state = 'begin'
+        self.src_dom = dom
+        self.file = file
+        self.start = 0
+        self.deferred = defer.Deferred()
+        
+    def sxpr(self):
+        sxpr = ['save',
+                ['id', self.id],
+                ['state', self.state],
+                ['domain', self.src_dom],
+                ['file', self.file] ]
+        return sxpr
 
-    def ok(self, dom):
+    def request(self, xfrd):
+        vmconfig = self.vmconfig()
+        if not vmconfig:
+            xfrd.loseConnection()
+            return
+        xfrd.request(['xfr.save', self.src_dom, vmconfig, self.file ])
+        
+    def xfr_save_ok(self, val):
+        dom = int(sxp.child0(val))
         self.state = 'ok'
-        self.dst_dom = dom
 
-    def closed(self, reason=None):
+    def connectionLost(self, reason=None):
         if self.state =='ok':
-            eserver.inject('xend.migrate.ok', self.sxpr())
+            eserver.inject('xend.save.ok', self.sxpr())
         else:
             self.state = 'error'
-            eserver.inject('xend.migrate.error', self.sxpr())
+            eserver.inject('xend.save.error', self.sxpr())
+    
 
 class XendMigrate:
+    """External api for interaction with xfrd for migrate and save.
+    Singleton.
+    """
     # Represents migration in progress.
     # Use log for indications of begin/end/errors?
     # Need logging of: domain create/halt, migrate begin/end/fail
@@ -224,7 +270,7 @@ class XendMigrate:
         id = self.nextid()
         info = XendMigrateInfo(id, dom, host, port)
         self._add_migrate(id, info)
-        mcf = MigrateClientFactory(info)
+        mcf = XfrdClientFactory(info)
         reactor.connectTCP('localhost', XFRD_PORT, mcf)
         return info
 
diff --git a/tools/python/xen/xend/packing.py b/tools/python/xen/xend/packing.py
deleted file mode 100644 (file)
index 760af7f..0000000
+++ /dev/null
@@ -1,329 +0,0 @@
-
-# XDR-style packer/unpacker for sxpr.
-#
-# string -> [STRING] [len:u16] <len bytes>
-# atom   -> [ATOM]   [len:u16] <len bytes>
-# int    -> [UINT]   [value]
-# list   -> [LIST]   {1 elt}* 0
-# null   -> [NULL]
-# none   -> [NONE]
-# bool   -> [BOOL]   { 0:u8 | 1:u8 }
-#
-# types packed as u16.
-#
-# So (a b c) -> [LIST] 1 a 1 b 1 c 0
-#    ()      -> [LIST] 0
-
-import struct
-
-try:
-    from cStringIO import StringIO as _StringIO
-except ImportError:
-    from StringIO import StringIO as _StringIO
-
-import types
-
-class Error(Exception):
-    
-    def __init__(self, msg):
-        self.msg = msg
-        
-    def __repr__(self):
-        return repr(self.msg)
-    
-    def __str__(self):
-        return str(self.msg)
-
-
-class ConversionError(Error):
-    pass
-
-BOOL_SIZE   = 1
-BOOL_FMT    = '>B'
-
-BYTE_SIZE   = 1
-BYTE_FMT    = '>b'
-UBYTE_FMT   = '>B'
-
-SHORT_SIZE  = 2
-SHORT_FMT   = '>h'
-USHORT_FMT  = '>H'
-
-INT_SIZE   =  4
-INT_FMT    = '>l'
-UINT_FMT   = '>L'
-
-NONE_CODE   = 0
-NULL_CODE   = 1
-INT_CODE    = 2
-STRING_CODE = 3
-ATOM_CODE   = 4
-BOOL_CODE   = 5
-LIST_CODE   = 10
-
-class Packer:
-    
-    def __init__(self, io=None):
-        self.reset(io=io)
-
-    def reset(self, io=None):
-        if io is None:
-            io = _StringIO()
-        self.io = io
-
-    def get_buffer(self):
-        return self.io.getvalue()
-
-    def get_io(self):
-        return self.io
-
-    def struct_pack(self, fmt, x):
-        try:
-            self.io.write(struct.pack(fmt, x))
-        except struct.error, msg:
-            raise ConversionError, msg
-
-    def pack_none(self):
-        pass
-    
-    def pack_bool(self, x):
-        # { '1' | '0' }
-        print 'bool>', x
-        if x:
-            self.io.write('\1')
-        else:
-            self.io.write('\0')
-
-    def pack_byte(self, x):
-        self.struct_pack(BYTE_FMT, x & 0xff)
-
-    def pack_char(self, x):
-        print 'char>', x
-        self.io.write(x)
-        
-    def pack_ubyte(self, x):
-        print 'ubyte>', x
-        self.struct_pack(UBYTE_FMT, x & 0xff)
-
-    def pack_ushort(self, x):
-        print 'ushort>', x
-        self.struct_pack(USHORT_FMT, x & 0xffff)
-        
-    def pack_short(self, x):
-        print 'short>', x
-        self.struct_pack(SHORT_FMT, x & 0xffff)
-
-    def pack_uint(self, x):
-        print 'uint>', x
-        self.struct_pack(UINT_FMT, x)
-        
-    def pack_int(self, x):
-        print 'int>', x
-        self.struct_pack(INT_FMT, x)
-
-    def pack_uhyper(self, x):
-        print 'uhyper>', x
-        self.pack_uint(x>>32 & 0xffffffffL)
-        self.pack_uint(x & 0xffffffffL)
-
-    pack_hyper = pack_uhyper
-
-    def pack_fstring(self, n, x):
-        print 'fstring>', x
-        self.io.write(x)
-
-    pack_fopaque = pack_fstring
-
-    def pack_string(self, x):
-        print 'string>', x
-        n = len(x)
-        self.pack_ushort(n)
-        self.pack_fstring(n, x)
-
-    pack_opaque = pack_string
-    pack_bytes = pack_string
-
-    def pack_list(self, x, pack_item):
-        print 'list>', x
-        # { '1' <item> }* '0'
-        for item in x:
-            self.pack_bool(1)
-            pack_item(item)
-        self.pack_bool(0)
-
-    def pack_farray(self, x, pack_item):
-        # <item>*
-        # Can pass n and check length - but is it worth it?
-        print 'farray>', list
-        for item in x:
-            pack_item(item)
-
-    def pack_array(self, x, pack_item):
-        # n <item>*n
-        print 'array>', x
-        self.pack_uint(len(x))
-        self.pack_farray(x, pack_item)
-
-class Unpacker:
-
-    def __init__(self, data):
-        self.reset(data)
-
-    def reset(self, data):
-        if isinstance(data, types.StringType):
-            data = _StringIO(data)
-        self.io = data
-
-    def get_bytes(self, n):
-        if n < 0:
-            raise ConversionError('negative byte count')
-        data = self.io.read(n)
-        return data
-
-    def struct_unpack(self, fmt, n):
-        data = self.get_bytes(n)
-        try:
-            return struct.unpack(fmt, data)[0]
-        except struct.error, msg:
-            raise ConversionError, msg
-       
-    def unpack_none(self):
-        return None
-
-    def unpack_bool(self):
-        return self.struct_unpack(BOOL_FMT, BOOL_SIZE)
-
-    def unpack_char(self):
-        return self.get_bytes(1)[0]
-
-    def unpack_byte(self):
-        return self.struct_unpack(BYTE_FMT, BYTE_SIZE)
-    
-    def unpack_ubyte(self):
-        return self.struct_unpack(UBYTE_FMT, BYTE_SIZE)
-    
-    def unpack_ushort(self):
-        return self.struct_unpack(USHORT_FMT, SHORT_SIZE)
-
-    def unpack_short(self):
-        return self.struct_unpack(SHORT_FMT, SHORT_SIZE)
-        
-    def unpack_uint(self):
-        x = self.struct_unpack(UINT_FMT, UINT_SIZE)
-        try:
-            return int(x)
-        except OverflowError:
-            return x
-
-    def unpack_int(self):
-        return self.struct_unpack(INT_FMT, INT_SIZE)
-
-    def unpack_uhyper(self):
-        hi = self.unpack_uint()
-        lo = self.unpack_uint()
-        return long(hi)<<32 | lo
-
-    def unpack_hyper(self):
-        x = self.unpack_uhyper()
-        if x >= 0x8000000000000000L:
-            x = x - 0x10000000000000000L
-        return x
-
-    def unpack_fstring(self, n):
-        return self.get_bytes(n)
-
-    unpack_fopaque = unpack_fstring
-
-    def unpack_string(self):
-        n = self.unpack_ushort()
-        return self.unpack_fstring(n)
-
-    unpack_opaque = unpack_string
-    unpack_bytes = unpack_string
-
-    def unpack_list(self, unpack_item):
-        list = []
-        while self.unpack_bool():
-            list.append(unpack_item())
-        return list
-
-    def unpack_farray(self, n, unpack_item):
-        list = []
-        for i in range(n):
-            list.append(unpack_item())
-        return list
-
-    def unpack_array(self, unpack_item):
-        n = self.unpack_ushort()
-        return self.unpack_farray(n, unpack_item)
-
-class SxpPacker(Packer):
-
-    pack_code = Packer.pack_ushort
-
-    def pack(self, x):
-        if isinstance(x, types.NoneType):
-            self.pack_code(NONE_CODE)
-            self.pack_none()
-        elif isinstance(x, types.IntType):
-            self.pack_code(INT_CODE)
-            self.pack_int(x)
-        elif isinstance(x, types.StringType):
-            self.pack_code(STRING_CODE)
-            self.pack_string(x)
-        elif isinstance(x, types.ListType):
-            self.pack_code(LIST_CODE)
-            self.pack_list(x, self.pack)
-        else:
-           raise Error('invalid type ' + str(type(x)))
-
-class SxpUnpacker(Unpacker):
-
-    unpack_code = Unpacker.unpack_ushort
-
-    def unpack(self):
-        code = self.unpack_code()
-        if code == NONE_CODE:
-            val = self.unpack_none()
-        elif code == INT_CODE:
-            val = self.unpack_int()
-        elif code == BOOL_CODE:
-            val = self.unpack_bool()
-        elif code == STRING_CODE:
-            val = self.unpack_string()
-        elif code == ATOM_CODE:
-            val = self.unpack_string()
-        elif code == LIST_CODE:
-            val = self.unpack_list(self.unpack)
-        else:
-            raise Error('invalid code ' + str(code))
-        return val
-
-def main():
-    d = "['vfarm', ['@', ['name', 'vfarm1']], ['memory', 1024], ['image', 'splinux'], ['args', 'root=/dev/nfs ip=dhcp'], [ 1, -1, 1000000]]"
-    print"> len=", len(d), "d=", d
-    obj = ['vfarm', ['@', ['name', 'vfarm1']],
-           ['memory', 1024],
-           ['image', 'splinux'],
-           ['args', 'root=/dev/nfs ip=dhcp'],
-           [ 1, -1, 1000000] ]
-    print "> obj=", obj
-    pack = SxpPacker()
-    pack.pack(obj)
-    data = pack.get_buffer()
-    print "> len=", len(data), "data=", data
-    unpack = SxpUnpacker(data)
-    obj_unpack = unpack.unpack()
-    print "> obj=", obj_unpack
-    #obj = [100,101,102, 999.00234, { 'a': 1, 'b': 2 } ]
-    #pack.reset()
-    #pack.pack_item(obj)
-    #data = pack.get_buffer()
-    #print "> obj=", obj
-    #print "> len=", len(data), "data=", data
-    #unpack.reset(data)
-    #obj_unpack = unpack.unpack_item()
-    #print "> obj=", obj_unpack
-    
-if __name__ == "__main__":
-    main()
index 1f1fc22407cc70db3e1e7ad6f1db60c8aa3b6f91..4badf454db7cc9aaf5d9ec5133d96e1b878b1eb5 100755 (executable)
@@ -14,7 +14,6 @@ import StringIO
 sys.path.append("/home/mjw/repos-bk/xeno-unstable.bk/tools/python")
 
 import xen.xend.sxp as sxp
-from xen.xend.packing import SxpPacker, SxpUnpacker
 
 XFRD_PORT = 8002